package com.msunsoft.main.etl.engine.impl;

import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.Timer;
import java.util.TimerTask;
import java.util.UUID;

import javax.annotation.Resource;

import org.pentaho.di.core.Result;
import org.pentaho.di.core.database.Database;
import org.pentaho.di.core.exception.KettleException;
import org.pentaho.di.core.gui.JobTracker;
import org.pentaho.di.core.logging.ChannelLogTable;
import org.pentaho.di.core.logging.JobEntryLogTable;
import org.pentaho.di.core.logging.JobLogTable;
import org.pentaho.di.core.logging.LogTableField;
import org.pentaho.di.core.logging.LogTableInterface;
import org.pentaho.di.i18n.BaseMessages;
import org.pentaho.di.job.Job;
import org.pentaho.di.job.JobAdapter;
import org.pentaho.di.job.JobEntryResult;
import org.pentaho.di.job.JobMeta;
import org.pentaho.di.repository.LongObjectId;
import org.pentaho.di.repository.ObjectId;
import org.pentaho.di.repository.kdr.KettleDatabaseRepository;
import org.pentaho.di.repository.kdr.KettleDatabaseRepositoryMeta;
import org.springframework.stereotype.Service;

import com.msunsoft.main.etl.engine.KettleEnvironmentRunEngineService;
import com.msunsoft.main.utils.Const;

/**
 * kettle 运行环境引擎，运行job
 * 
 * @author Administrator
 *
 */
@Service
public class KettleEnvironmentRunEngineServiceImpl implements KettleEnvironmentRunEngineService {

	@Resource(name = "etl.repositoryMeta")
	private KettleDatabaseRepositoryMeta repInfo;

	public static Map<String, Job> JobsMap = new HashMap<String, Job>();

	private KettleDatabaseRepository rep = new KettleDatabaseRepository();

	private KettleDatabaseRepository initKettleDatabaseRepository() throws KettleException {
		if (!rep.isConnected()) {
			rep.init(repInfo);
			rep.connect("admin", "admin");
		}
		return rep;
	}

	@Override
	public boolean runJobformation(Long id_job) {

		boolean isRes = false;
		try {
			// 资源库
			KettleDatabaseRepository rep = initKettleDatabaseRepository();

			ObjectId oId = new LongObjectId(id_job);
			final JobMeta jobMeta = rep.loadJob(oId, null);
			final Job job = new Job(rep, jobMeta);
			if (job != null) {
				job.getJobMeta().setInternalKettleVariables(job);
				addJobLogTable(jobMeta);
				job.start();
				
				final Timer timer = new Timer(job.getName() + " - interval logging timer");
				TimerTask timerTask = new TimerTask() {
					public void run() {
						try {
							int previousNrItems = -1;
							//writeJobEntryLogInformation(job, jobMeta, previousNrItems);
						} catch (Exception e) {
							System.out.println("Job.Exception.UnableToPerformIntervalLogging");
						}
					}
				};
				timer.schedule(timerTask, 1 * 1000, 10 * 1000);
				job.addJobListener(new JobAdapter() {
					public void jobFinished(Job job) {
						timer.cancel();
						try {
							int previousNrItems = -1;
							writeJobEntryLogInformation(job, jobMeta, previousNrItems);
						} catch (KettleException e) {
							e.printStackTrace();
						}
					}
				});
				JobsMap.put(String.valueOf(id_job), job);
				if (jobMeta != null) {
					if (!job.isStopped())
						isRes = true;
					else
						isRes = false;
				} else {
					String code = "401";
				}
			}
		} catch (KettleException e) {
			e.printStackTrace();//todo
		}

		return isRes;
	}

	@Override
	public boolean stopJobformation(Long WORKID) {
		String code = "200";
		String msg = "Stop Success";
		try {
			Job job = (Job) JobsMap.get(String.valueOf(WORKID));
			if (job == null) {
				code = "202";
				msg = "NoStart";
			} else if (job.isStopped()) {
				code = "201";
				msg = "NoStop";
			} else {
				job.stopAll();
				System.out.println(
						"SID: [" + job.getJobname() + "]" + " Stop Success,filename is[" + job.getFilename() + "]");
			}
		} catch (Exception e) {
			msg = "Stop error:" + e.getMessage();
			code = "401";
		}
		return false;
	}

	@Override
	public boolean pauseTransformation(Long WORKID) {
		// TODO Auto-generated method stub
		return false;
	}

	/**
	 * 获取作业运行状态
	 */
	public String getJobStatusById(Long WORKID){
		Job job = (Job)JobsMap.get(String.valueOf(WORKID));
		if(job != null)
			return job.getStatus();
		else
			return "未运行";
	}

	private void addJobLogTable(JobMeta jobMeta) {
		JobLogTable logTable = jobMeta.getJobLogTable();
		logTable.setTableName(Const.LOG_JOBS);
		logTable.setLogTableDatabaseMeta(Const.getLogTableDatabaseMeta());
		List fields = logTable.getFields();
		LogTableField field = (LogTableField) fields.get(fields.size() - 1);
		field.setEnabled(true);
		logTable.setFields(fields);
		logTable.setLogSizeLimit("5000");
		logTable.setLogInterval("10");
	}
	
	/**
	 * 重新jobEntryLog，
	 * @param previousNrItems 
	 */
	
	protected void writeJobEntryLogInformation(Job job, JobMeta jobMeta, int previousNrItems) throws KettleException {
		
		Database db = null;
		JobTracker jobTracker = job.getJobTracker();
		int nrItems = jobTracker.getTotalNumberOfItems();
		try {
			db = new Database(Const.logTableDatabaseMeta);
			db.connect();
			db.setCommit(10);

			if (nrItems!=previousNrItems)
	        {
				String jobName = jobTracker.getJobName();
				  String id = job.getLogChannel().getLogChannelId();
				  Map<String,String> preStepId = new HashMap<String,String>();
				  for (int i=0;i<jobTracker.nrJobTrackers();i++)
	              {
	                  addTrackerToTree(jobTracker.getJobTracker(i),id,preStepId,db);
	              }
	              previousNrItems = nrItems;
	        }

		} catch (Exception e) {
			throw new KettleException("Job.Exception.UnableToJobEntryInformationToLogTable", e);
		} finally {
			if (!db.isAutoCommit())
				db.commit(true);
			db.disconnect();
		}
	}
	
	private void addTrackerToTree(JobTracker jobTracker,String parentId,Map preStepId, Database db){
		try
        {
            if (jobTracker!=null)
            {
            	
            	if (jobTracker.nrJobTrackers()>0)
                {
                    for (int i=0;i<jobTracker.nrJobTrackers();i++)
                    {	
                        addTrackerToTree(jobTracker.getJobTracker(i),(String)preStepId.get(jobTracker.getJobName()), preStepId, db);
                    }
//                  insertSQL.append("insert into etl_log_job_metric(CHANNEL_ID,PARENT_CHANNEL_ID,JOBJOBENTRY,COMMENTS,RESULT,NR,REASON) values(");
//                	insertSQL.append("'").append(id).append("',");
//                	insertSQL.append("'").append(parentId).append("',");
//                	insertSQL.append("'").append(jobEntryName).append("',");
                }else
                {
                	String id = UUID.randomUUID().toString();
                	StringBuffer insertSQL = new StringBuffer();
                	insertSQL.append("insert into etl_log_job_metric(CHANNEL_ID,PARENT_CHANNEL_ID,JOBJOBENTRY,COMMENTS,RESULT,NR,REASON) values(");
                	insertSQL.append("'").append(id).append("',");
                	insertSQL.append("'").append(parentId).append("',");
                	JobEntryResult result = jobTracker.getJobEntryResult();
                    if (result!=null)
                    {
                        String jobEntryName = result.getJobEntryName();
                        if (!Const.isEmpty(jobEntryName))
                        {
                        	preStepId.put(jobEntryName, id);
                        	insertSQL.append("'").append(jobEntryName).append("',");
                        }
                        else
                        {
                        	insertSQL.append("'").append(jobTracker.getJobName()).append("',");
                        }
                        String comment = result.getComment();
                        if (comment!=null)
                        {
                        	insertSQL.append("'").append(comment).append("',");
                        }else{
                        	insertSQL.append("'").append("").append("',");
                        }
                
                        Result res = result.getResult();
                        if (res!=null)
                        {
                        	insertSQL.append("'").append(res.getResult()?"Success":"Failure").append("',");
                        	insertSQL.append("'").append(Long.toString(res.getEntryNr())).append("',");
                        }else{
                        	insertSQL.append("'").append("").append("',");
                        	insertSQL.append("'").append("").append("',");
                        }
                        String reason = result.getReason();
                        if (reason!=null)
                        {
                        	insertSQL.append("'").append(reason).append("')");
                        }else{
                        	insertSQL.append("'").append("").append("')");
                        }
//                        Date logDate = result.getLogDate();
//                        if (logDate!=null)
//                        {
//                        	insertSQL.append("'").append(new SimpleDateFormat("yyyy/MM/dd HH:mm:ss").format(logDate)).append("')");
//                        }
                    }
                	db.execStatement(insertSQL.toString());
                }
            }
        }
        catch(Exception e)
        {
            System.out.println(Const.getStackTracker(e));
        }
	}

}
